Skip to main content

Using Link to communicate with Broker

Link, specifically LocalLink can be used to perform publish and subscribe actions. To get LinkTx ( used for sending MQTT packets like publish / subscribe to router ) and LinkRx ( used for receiving Notifications from router ) link("__") method can be used:

let (mut link_tx, mut link_rx) = broker.link("link_name")?;

Then, link_tx can be used to send packets:

link_tx.subscribe("topic_name")?;
link_tx.publish("topic_name", "payload")?;
// try_xx acts like try_send in channels, it won't block / wait
// until there is space for sending this events to router
// If there is no space, it will return an error.
// ```
// link_tx.try_subscribe("topic_name")?;
// link_tx.try_publish("topic_name", "payload")?;
// ```

// if you are using it in async fn, you can also use send()
// for sending raw packets ( you might never need to use it! )
// e.g.
// ```
// use rumqttd::protocol::{Packet, Unsubscribe};
//
// let unsub_packet = Unsubscribe {
// pkid: 0,
// filters: vec!["topic".into()],
// };
// // ^ this is just example, plz fill with appropriate values!
//
// let data = Packet::Unsubscribe(unsub_packet, None);
// link_tx.send(data).await?;
// ```

On other hand, link_rx is used for receiving the Notification from router:

let notification = link_rx.recv()?;
// you can also specify Instance as deadline using recv_deadline(_)
// ```
// let duration = Duration::from_secs(5);
// let deadline = Instant::now().checked_add(duration).unwrap();
// let notification = link_rx.recv_deadline(deadline).unwrap();
// ```

NOTE: You can think of link as creating local client!

Simple example of using link to subscribe for all messages ( i.e. # ) and receiving notifications:

use rumqttd::{Broker, Config, Notification};

use std::thread;

fn main() {
let builder = tracing_subscriber::fmt()
.pretty()
.with_line_number(false)
.with_file(false)
.with_thread_ids(false)
.with_thread_names(false);

builder
.try_init()
.expect("initialized subscriber succesfully");

let config = config::Config::builder()
.add_source(config::File::with_name("rumqttd.toml"))
.build()
.unwrap();

let config: Config = config.try_deserialize().unwrap();

let mut broker = Broker::new(config);
let (mut link_tx, mut link_rx) = broker.link("singlenode").unwrap();
thread::spawn(move || {
broker.start().unwrap();
});

link_tx.subscribe("#").unwrap();

let mut count = 0;
loop {
let notification = match link_rx.recv().unwrap() {
Some(v) => v,
None => continue,
};

match notification {
Notification::Forward(forward) => {
count += 1;
println!(
"Topic = {:?}, Count = {}, Payload = {} bytes",
forward.publish.topic,
count,
forward.publish.payload.len()
);
}
v => {
println!("{v:?}");
}
}
}
}